package com.shujia.spark.sql

import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

object Demo3DataFrameApi {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder()
      .appName("source")
      .master("local")
      .config("spark.sql.shuffle.partitions", "1")
      .getOrCreate()

    //导入隐式转换
    import spark.implicits._
    //导入spark 所有的函数
    import org.apache.spark.sql.functions._


    val student: DataFrame = spark.read.json("data/students.json")


    /**
      * show :相当于一个action算子
      *
      */
    //默认读取前10行
    student.show()

    //指定读取的行数
    student.show(100)

    //完整显示数据
    student.show(false)


    /**
      * select
      *
      */

    //使用列对象的方式
    student.select($"name", $"age" + 1 as "age").show()
    //使用列名，不能进行计算
    student.select("name", "clazz").show()

    //使用sql 表达式
    student.selectExpr("name", "age + 1 as age").show()

    student.select(expr("name"), expr("age + 1") as "age").show()


    /**
      * where
      *
      */

    student.where($"age" > 23).show()
    student.where("age < 22").show()


    /**
      * group
      *
      */


    student.groupBy($"clazz").count().show()
    student.groupBy("clazz").count().show()


    /**
      * agg: 聚合
      *
      */

    student.agg(count($"id") as "num").show()

    student
      .groupBy("clazz")
      .agg(count("clazz") as "num")
      .show()

    student
      .groupBy($"clazz")
      .agg(avg($"age") as "avgAge")
      .show()


    /**
      * json
      *
      */


    val score: DataFrame = spark
      .read
      .format("csv")
      .option("sep", ",")
      .schema("id STRING,cid STRING,sco DOUBlE")
      .load("data/score.txt")


    //对DF 取别名
    val stuAS: Dataset[Row] = student.as("stu")
    val scoAS: Dataset[Row] = score.as("sco")

    //指定关联条件
    val joinDF: DataFrame = stuAS.join(scoAS, $"stu.id" === $"sco.id", "inner")

    joinDF.show()

    //指定关联的列
    stuAS.join(scoAS, List("id"), "inner").show()


    /**
      * sort
      *
      */

    score
      .groupBy($"id")
      .agg(sum($"sco") as "sumSco")
      .sort($"sumSco".desc) //降序
      .select($"id", $"sumSco")
    //.show()


    /**
      * over: 开窗函数
      *
      */

    //获取每个班级总分前十的学生


    student
      .join(score, "id")
      .groupBy($"id", $"clazz")
      .agg(sum($"sco") as "sumSco")
      .select($"id", $"clazz", $"sumSco", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc) as "r")
      .where($"r" <= 10)
    //.show()

    /**
      * withColumn: 增加字段
      *
      */

    student
      .join(score, "id")
      .groupBy($"id", $"clazz")
      .agg(sum($"sco") as "sumSco")
      .withColumn("r", row_number() over Window.partitionBy($"clazz").orderBy($"sumSco".desc))
      .where($"r" <= 10)
    //.show()


    /**
      *
      * sql
      */

    student.createOrReplaceTempView("student")
    score.createOrReplaceTempView("score")

    val resultDF: DataFrame = spark.sql(
      """
        |
        |select * from (
        |select id,clazz,sumSco,row_number() over(partition by clazz order by sumSco desc) as r from (
        |select a.id,a.clazz,sum(b.sco) as sumSco from
        |student as a
        |join
        |score as b
        |on a.id=b.id
        |group by a.id,a.clazz) as c ) as d
        |where r<=10
        |
        |
      """.stripMargin)

    //resultDF.show()


    /**
      * explode
      *
      */

    val linesDF: DataFrame = spark.read
      .format("csv")
      .option("sep", "\t")
      .schema("lines STRING")
      .load("data/words.txt")


    linesDF
      .select(explode(split($"lines", ",")) as "word")
      .groupBy($"word")
      .agg(count($"word") as "c")
      .show()


    /**
      * map join
      *
      * 100m左右可以广播
      *
      */

    student.hint("broadcast").join(score, "id") show()
  }

}
